---
title: Spark 架构分析
description: 分析 Spark 架构，包括 SQL 解析、逻辑计划、物理计划、执行计划等。
date: 2025-07-04 15:32
tags: ["大数据", "Spark", "架构分析"]
published: true
status: evergreen
---

# SQL 解析
解析阶段将 SQL 文本转换为未解析的逻辑计划。Spark SQL 使用 ANTLR4 解析 SQL 语句，生成抽象语法树（AST）。具体的流程为：
1. SqlBaseLexer 分词
2. SqlBaseParser SQL 解析
3. AstBuilder Ast 语法树构建
## SqlBaseLexer 分词
SqlBaseLexer：[SqlBaseLexer](https://github.com/apache/spark/blob/0ef86631/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4)

SqlBaseLexer 对 SQL 文本执行词法分析，将其分解为诸如关键字、标识符、文本和运算符等标记。它是从 SqlBaseLexer.g4 中的语法定义生成的。

**主要特点：**

1. 检测关键字和保留字
2. 文本（字符串、数字等）的标记识别
3. 特殊字符和运算符的处理
4. 支持注释和空格
5. 词法分析器包含用于处理边缘情况（如十进制文本）和检测未闭合括号注释的特殊方法。

## SqlBaseParser SQL 解析器

SqlBaseParser 通过根据语法规则将标记组织到分层结构中来执行句法分析。它是根据 SqlBaseParser.g4 中的语法定义生成的。

**主要特点：**

1. 解析 DML 语句（SELECT、INSERT、UPDATE、DELETE）
2. 解析 DDL 语句（CREATE、ALTER、DROP）
3. 支持表达式、联接类型、窗口函数等。
4. 子查询和 CTE（公共表表达式）的处理
5. 解析器生成表示 SQL 语句结构的解析树（抽象语法树或 AST）。

## AstBuilder Ast 语法树构建

AstBuilder 是 ANTLR 解析树和 Spark 的逻辑计划表示之间的桥梁。它使用**访问者模式**遍历解析树并构建相应的逻辑计划。

**重要方法：**

1. visitSingleStatement()：解析单个 SQL 语句的入口点
2. visitQuery()：处理 SELECT 查询
3. visitInsertInto()：处理 INSERT 语句
4. visitExpression()：将 SQL 表达式转换为 Catalyst 表达式
5. AstBuilder 会创建未解析的逻辑计划，其中对表、列和函数的引用尚未连接到实际数据或实现。

# SQL 分析
Spark的分析阶段（Analysis Phase）是Spark SQL查询处理流程中的核心环节，负责将用户提交的未解析逻辑计划（Unresolved LogicalPlan）转换为已解析的逻辑计划（Analyzed LogicalPlan）。这一过程通过多批次规则（Batches of Rules）逐步解决逻辑计划中的未绑定元素（如表名、字段名、类型等），最终生成一个语义明确、可执行的逻辑计划。

## 未解析逻辑计划的来源

用户提交的SQL或DataFrame操作会经过解析器（Parser）生成**语法树**，此时树中的表、字段等信息尚未与真实数据源关联。例如：

- 表名可能未与Catalog中的元数据匹配；
- 字段的类型或存在性未被验证；
- 函数名未绑定到实际实现方法

## 分析阶段的核心流程

分析阶段通过\*\*规则引擎（RuleExecutor）\*\*逐批应用规则，逐步“绑定”未解析元素。关键步骤如下：

####  **（1）规则批次（Batches）的划分**

Analyzer中预定义了多个规则批次，每个批次包含一组相关规则，按顺序执行。常见批次包括：

- **Hints处理**：处理如`/*+ BROADCASTJOIN */`等优化提示。
- **简单检查（Simple Sanity Check）** ：验证函数是否存在、语法合法性等。
- **解析（Resolution）** ：核心批次，绑定表名、字段名、分配唯一ID等。
- **后处理（Post-Hoc Resolution）** ：用户自定义的扩展规则 36 28。

####  **（2）规则应用方式**

- **自顶向下或自底向上遍历**：例如，`ResolveRelations`规则通过遍历语法树，将未解析的表名（如`UnresolvedRelation`）替换为Catalog中注册的实际数据源逻辑计划 14。
- **模式匹配（Pattern Matching）** ：规则通过Scala的模式匹配机制匹配特定子树结构。例如，匹配`Filter`节点并下推过滤条件 23。
- **迭代执行**：某些规则需多次应用直至达到“固定点”（FixedPoint），即逻辑计划不再变化 28。

####  **（3）关键解析规则举例**

- **ResolveRelations**：解析表名，检查Catalog是否存在对应表或视图，并替换为具体数据源的逻辑计划。
- **ResolveReferences**：绑定字段名到具体列，验证字段是否存在及类型是否兼容。
- **ResolveFunctions**：验证函数是否存在，并绑定到实际实现。
- **ImplicitTypeCasts**：自动插入类型转换（如将`String`隐式转为`Date`）

## 案例

假设 `person` 表已注册到 Spark 的 Catalog 中（例如通过 `spark.sql("CREATE TABLE person (id INT, name STRING)")`）然后以如下的 Sql 语句来说明

```sql
select id from person
```

通过分词以及解析后，得到了一个未解析的逻辑计划

```scala
== Unresolved Logical Plan ==
'Project ['id]
+- 'UnresolvedRelation [person]  // 未解析的表名
```

- **表名未绑定**：`UnresolvedRelation("person")` 尚未关联到 Catalog 中的实际数据源。
- **字段未验证**：`'id` 是 `UnresolvedAttribute`，尚未绑定到具体表的列。

### 应用**ResolveRelations 规则**

**ResolveRelations**：解析表名，检查Catalog是否存在对应表或视图，并替换为具体数据源的逻辑计划。

**规则目标**：将 `UnresolvedRelation` 绑定到 Catalog 中注册的表或视图。

1. Analyzer 遍历逻辑计划，发现 `UnresolvedRelation("person")`。
2. 查询 Catalog（如 Hive Metastore 或 Spark 内置 Catalog），发现 `person` 表存在，其 Schema 为 `(id INT, name STRING)`。
3. 将 `UnresolvedRelation` 替换为具体的 `LogicalRelation`（表示数据源的逻辑结构）。

**更新后的逻辑计划**：

```scala
== Analyzed Logical Plan (Partial) ==
'Project ['id]
+- SubqueryAlias person  // 绑定到具体表的逻辑计划
   +- LogicalRelation (id INT, name STRING)  // 数据源的逻辑表示
```

- **表名已绑定**：`person` 表已被替换为 `SubqueryAlias` 和 `LogicalRelation`。
- **字段仍处于未解析状态**：`'id` 仍然是 `UnresolvedAttribute`。

### 应用`ResolveReferences` **规则**

**规则目标**：将 `UnresolvedAttribute` 绑定到具体表的列。

1. Analyzer 遍历逻辑计划，发现 `'id` 是未解析字段。
2. 向上查找表的 Schema（从父节点 `SubqueryAlias person` 的 `LogicalRelation` 中获取 Schema）。
3. 检查 `person` 表的 Schema 是否包含 `id` 列：

- 如果存在，将 `'id` 替换为 `AttributeReference`（包含唯一 ID 和数据类型）。
- 如果不存在，抛出 `AnalysisException`（如 `Column 'id' not found`）。

**更新后的逻辑计划**：

```scala
== Analyzed Logical Plan ==
Project [id#10]  // 绑定到具体列（id INT）
+- SubqueryAlias person
+- LogicalRelation (id#10 INT, name#11 STRING)
```

此时：

- **字段已绑定**：`id` 对应到 `LogicalRelation` 中的 `id#10`（`AttributeReference`）。
- **分配唯一 ID**：`id#10` 是全局唯一的表达式 ID（`ExprId`），避免名称冲突。
- **数据类型明确**：`id` 的类型确定为 `INT`。

### 扩展

Spark 允许用户注入自定义规则。例如，定义一个规则 `ResolveSpecialColumns`，将 `SELECT special_id FROM person` 中的 `special_id` 映射到 `id`：

```scala
object ResolveSpecialColumns extends Rule[LogicalPlan] {
  def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
    case Project(projectList, child) =>
      Project(projectList.map {
        case UnresolvedAttribute("special_id") => child.output.find(_.name == "id").get
        case other => other
      }, child)
  }
}

// 注册自定义规则到 Analyzer
spark.sessionState.analyzer.addResolutionRule(ResolveSpecialColumns)
```

封装为 jar 包，注册到 Spark 中。

# 逻辑优化

当 Spark 分析完成后，生成了对应的已经分析后的逻辑计划，然后通过Catalyst 优化器对解析的逻辑计划进行优化，从而将计划转换为更高效的形式，即**解析后的逻辑计划-&gt;优化后的逻辑计划，很多的优化方式都是 SQL 中常用的优化方式**。

Apache Spark的Catalyst优化器通过多种优化方法提升查询性能，结合**规则优化**（Rule-Based）和**成本优化**（Cost-Based）策略，其常用优化方法及具体实例如下：

## 谓词下推

将过滤条件尽可能下推到数据源附近，减少后续处理的数据量。例如，在读取Parquet或Hive表时，直接跳过不满足条件的行。

```sql
SELECT * FROM table WHERE age > 30 AND country = 'US'
```

优化后，Spark可能直接在数据源层面应用`country = 'US'`过滤，减少数据加载。

## **列裁剪（Column Pruning）**

仅读取查询中涉及的列，忽略无关列以减少I/O和内存开销。

**示例**：查询`SELECT name FROM employees`时，即使表中包含`age`或`salary`等字段，Catalyst会跳过这些列的读取。

## **常量折叠（Constant Folding）**

在编译阶段计算常量表达式，避免运行时重复计算。

**示例**：`SELECT salary * 0.1 * 12`中的`0.1 * 12`会被提前计算为`1.2`，运行时直接计算`salary * 1.2`。

## **操作符合并与简化**

- **CombineFilters**：合并连续的过滤条件。例如，`Filter(age > 30, Filter(age < 50, child))`合并为`Filter(age > 30 AND age < 50, child)`，减少多次扫描。
- **CollapseRepartition**：合并相邻的`repartition`操作，避免不必要的Shuffle 。
- **LikeSimplification**：将`LIKE 'abc%'`转换为更高效的`StartsWith("abc")` 。

## 案例

```sql
SELECT 
  o.order_id, 
  u.name, 
  o.amount 
FROM 
  orders o 
JOIN 
  users u 
ON 
  o.user_id = u.user_id 
WHERE 
  u.country = 'US' 
  AND o.amount > 100 
  AND o.order_date >= '2024-01-01'
```

Spark 解析 SQL 后生成的原始逻辑计划如下

```scala
== Parsed Logical Plan ==
Project [order_id#10, name#24, amount#12]
+- Filter (country#25 = US AND amount#12 > 100 AND order_date#13 >= 2024-01-01)
   +- Join Inner (user_id#11 = user_id#23)
      :- SubqueryAlias o
      :  +- Relation[order_id#10, user_id#11, amount#12, order_date#13] parquet
      +- SubqueryAlias u
         +- Relation[user_id#23, name#24, age#25, country#26] parquet
```

- 按照 SQL 的书写顺序生成计划：先执行 `JOIN`，然后执行 `Filter`，最后执行 `Project`。
- 所有过滤条件（`country`, `amount`, `order_date`）均位于 `JOIN` 之后，可能导致 `JOIN` 处理大量冗余数据。

Catalyst 应用优化规则后，逻辑计划变为：

```scala
== Optimized Logical Plan ==
Project [order_id#10, name#24, amount#12]
+- Join Inner (user_id#11 = user_id#23)
   :- Filter (amount#12 > 100 AND order_date#13 >= 2024-01-01)
   :  +- Relation[order_id#10, user_id#11, amount#12, order_date#13] parquet
   +- Filter (country#26 = US)
      +- Relation[user_id#23, name#24, age#25, country#26] parquet
```

以下通过一个具体的 SQL 查询案例，对比**解析后的原始逻辑计划**和**优化后的逻辑计划**，详细说明 Catalyst 优化器在逻辑计划层面的优化过程。

### **案例背景**

假设我们有两个表：

- `orders` 表（字段：`order_id`, `user_id`, `amount`, `order_date`）
- `users` 表（字段：`user_id`, `name`, `age`, `country`）

执行如下查询：

```sql
SELECT 
  o.order_id, 
  u.name, 
  o.amount 
FROM 
  orders o 
JOIN 
  users u 
ON 
  o.user_id = u.user_id 
WHERE 
  u.country = 'US' 
  AND o.amount > 100 
  AND o.order_date >= '2024-01-01'
```

### **1. 解析后的原始逻辑计划**

Spark 解析 SQL 后生成的原始逻辑计划如下（简化表示）：

```scala
== Parsed Logical Plan ==
Project [order_id#10, name#24, amount#12]
+- Filter (country#25 = US AND amount#12 > 100 AND order_date#13 >= 2024-01-01)
   +- Join Inner (user_id#11 = user_id#23)
      :- SubqueryAlias o
      :  +- Relation[order_id#10, user_id#11, amount#12, order_date#13] parquet
      +- SubqueryAlias u
         +- Relation[user_id#23, name#24, age#25, country#26] parquet
```

**特点**：

- 按照 SQL 的书写顺序生成计划：先执行 `JOIN`，然后执行 `Filter`，最后执行 `Project`。
- 所有过滤条件（`country`, `amount`, `order_date`）均位于 `JOIN` 之后，可能导致 `JOIN` 处理大量冗余数据。

### **2. 优化后的逻辑计划**

Catalyst 应用优化规则后，逻辑计划变为：

```scala
== Optimized Logical Plan ==
Project [order_id#10, name#24, amount#12]
+- Join Inner (user_id#11 = user_id#23)
   :- Filter (amount#12 > 100 AND order_date#13 >= 2024-01-01)
   :  +- Relation[order_id#10, user_id#11, amount#12, order_date#13] parquet
   +- Filter (country#26 = US)
      +- Relation[user_id#23, name#24, age#25, country#26] parquet
```

**关键优化步骤**：

1. **谓词下推（Predicate Pushdown）**

- 将 `Filter (country = 'US')` 下推到 `users` 表的数据源读取阶段，直接过滤掉非美国用户的数据。
- 将 `Filter (amount > 100 AND order_date >= '2024-01-01')` 下推到 `orders` 表的数据源读取阶段，减少参与 `JOIN` 的订单数据量。
- 优化后，`JOIN` 操作仅处理过滤后的数据。

2. **列裁剪（Column Pruning）**

- `orders` 表仅保留 `order_id`, `user_id`, `amount`, `order_date`（查询中引用的字段）。若表中还有其他字段（如 `product_id`），会被自动裁剪。
- `users` 表仅保留 `user_id`, `name`, `country`，忽略 `age` 字段。

3. **常量折叠（Constant Folding）**  表达式 `order_date >= '2024-01-01'` 中的字符串 `'2024-01-01'` 会被转换为日期类型常量（如 `2024-01-01 00:00:00`），避免运行时重复解析。

## 物理算子

了解物理计划前，要先明白什么是物理算子。

1. ((20250527205757-b6b6n9h '物理算子是什么？是某种框架的 API 吗？'))
2. ((20250527205757-x2qz44i '为什么要使用物理算子，而不是直接使用优化后的逻辑算子'))】
3. ((20250527205757-0zchep1 '物理算子有哪些？分别有什么样的作用呢？'))】
4. 只有 SparkSQL 用到了物理算子吗？还是用 RDD 算子也使用到了物理算子呢？

### 物理算子是什么？是某种框架的 API 吗？

在 Spark 中，**物理算子（Physical Operators）**  是为最终执行设计的底层操作单元，负责将逻辑计划转换为具体的、可执行的步骤。它们并非某种语言的 API，而是 Spark 内部执行引擎的组成部分，直接对应到分布式计算任务的具体实现。

- **用户不直接调用物理算子**：用户通过 DataFrame、SQL 或 RDD 等高级 API 编写逻辑（例如 `df.filter()` 或 `SELECT * FROM table`），由 Catalyst 优化器将这些逻辑转换为物理算子。
- **内部实现**：物理算子是 Spark 执行引擎的内部组件，例如 `FileSourceScanExec` 类是 Spark 源码中的一个 Scala 类，负责生成文件扫描的 RDD。
- **查看物理计划**：用户可以通过 `df.explain(mode="formatted")` 或 Spark UI 查看物理计划中的物理算子，但通常不需要直接操作它们。

### 为什么要使用物理算子，而不是直接使用优化后的逻辑算子

每个物理算子会生成对应的 **RDD 操作链**：

**物理算子树 → 转换为 RDD DAG → 分解为 Stage 和 Task → 在 Executor 上执行**

### 物理算子有哪些？分别有什么样的作用呢？

以下是一些常见的物理算子及其作用：

| **物理算子** | **作用**                                     |
| -- | -------------------------------------- |
| `FileSourceScanExec` | 从文件（Parquet、CSV 等）中读取数据  |
| `FilterExec` | 对数据行按条件过滤                   |
| `ProjectExec` | 选择或计算特定列（类似 SELECT 语句） |
| `SortMergeJoinExec` | 基于排序的 Join 操作（大表 Join）    |
| `BroadcastHashJoinExec` | 广播小表实现 Hash Join（小表 Join）  |
| `HashAggregateExec` | 内存中的哈希聚合（如 GROUP BY）      |
| `SortExec` | 对数据排序                           |
| `ShuffleExchangeExec` | 数据重分区（Shuffle）                |

# 代码生成

在 Spark 中，物理执行计划生成后，代码生成（Code Generation）是将物理算子树转换为**可执行的字节码**的关键步骤，这是通过 **Tungsten 优化引擎** 实现的。这一过程大幅减少了虚函数调用和中间数据结构的开销，显著提高了 CPU 效率。

## 1. 代码生成的核心机制

Spark 使用 **Whole-Stage Code Generation（全阶段代码生成）**  技术，将**多个物理算子合并为一个 Java 方法**，直接生成高度优化的代码。以下是具体流程：

### 步骤 1：物理算子的遍历

- 物理执行计划树由多个物理算子（如 `FilterExec`、`ProjectExec`、`HashAggregateExec`）组成。
- Spark 遍历物理算子树，识别可以合并为单个代码生成阶段的连续算子（称为一个  **"Stage"** ）。

### 步骤 2：生成代码片段

- 对每个 Stage 内的算子，Spark 动态生成 **Java 代码**：

  - 例如，`Filter(condition)` 和 `Project(expressions)` 两个连续算子会被合并为一个循环：

```java
for (Row row : inputRows) {
    if (row.getInt(0) > 30) {          // Filter 的 condition
        outputRow.setInt(0, row.getInt(0) * 2); // Project 的表达式
        emit(outputRow);
    }
}
```

- 代码直接操作二进制数据（Tungsten 内存格式），跳过 Spark 内部的 Row 对象封装。

### 步骤 3：编译字节码

- 生成的 Java 代码通过 **Janino 编译器** 编译为 **JVM 字节码**。
- 编译后的类通过反射加载到 JVM 中，最终生成一个实现了 `RDD[InternalRow]` 的计算逻辑的类。

### 步骤 4：执行生成的代码

- 生成的代码直接操作 `UnsafeRow`（基于堆外内存的二进制格式），避免序列化/反序列化开销。
- **最终将生成的代码逻辑绑定到 RDD 的**  `compute()` **方法中，由 Executor 执行。所以只有 RDD 的动作算子才会真正的运行。**

# 常用操作

## 显示所有阶段的执行计划

使用explain方法查看所有阶段，可以通过`explain(extended=True)`一次性查看所有阶段的计划

```python
df = spark.sql("SELECT * FROM range(10) WHERE id > 5")
display(df)
print("=== Parsed Logical Plan ===")
df.explain(extended=True)
```

```scala
=== Parsed Logical Plan ===
== Parsed Logical Plan ==
'Project [*]
+- 'Filter ('id > 5)
  +- 'UnresolvedTableValuedFunction [range], [10], false

== Analyzed Logical Plan ==
id: bigint
Project [id#72430L]
+- Filter (id#72430L > cast(5 as bigint))
  +- Range (0, 10, step=1, splits=None)

== Optimized Logical Plan ==
Filter (id#72430L > 5)
  +- Range (0, 10, step=1, splits=None)

== Physical Plan ==
*(1) Filter (id#72430L > 5)
  +- *(1) Range (0, 10, step=1, splits=8)
```

# 问题与回答

## 为什么 RDD 的动作算子才会执行，而逻辑算子并不会执行？

在最终将生成的代码逻辑绑定到 RDD 的  compute() 方法中，由 Executor 执行。所以只有 RDD 的动作算子才会真正的运行。

通过物理计划生成代码后，将代码绑定到RDD 的 `compute()` 方法中，再由Executor 执行。所以只有 RDD 的动作算子才会真正的运行。

## DAG 通过 Shuffle 进行划分 Stage，本质是什么样子的呢？

我们都知道 DAG Schedular 通过 Shuffle 将 DAG 划分为 Stage，那么本质上是如何实现的呢？

首先，我们需要知道Stage 的划分是在 **DAG Scheduler** 处理物理执行计划时完成的，属于**任务调度阶段。** 物理执行计划中有`ShuffleExchangeExec` 这个物理算子，所以很显然，通过物理执行计划中的`ShuffleExchangeExec` 算子，将 DAG 划分为 Stage。

- Stage 分为两种类型：

│     └─ FileSourceScanExec
└─ ShuffleExchangeExec (右表分区)
   └─ ProjectExec
      └─ FileSourceScanExec
```

- **Stage 划分**：

  - 每个 `ShuffleExchangeExec` 会生成一个独立的 **ShuffleMapStage**。
  - `SortMergeJoinExec` 和后续操作（如有）会生成 **ResultStage**。